package com.itasura.guava.Concurrency;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 修饰符和类型	方法和描述
 * double	acquire() 从RateLimiter获取一个许可，该方法会被阻塞直到获取到请求
 * double	acquire(int permits)从RateLimiter获取指定许可数，该方法会被阻塞直到获取到请求
 * static RateLimiter	create(double permitsPerSecond)根据指定的稳定吞吐率创建RateLimiter，这里的吞吐率是指每秒多少许可数（通常是指QPS，每秒多少查询）
 * static RateLimiter	create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)根据指定的稳定吞吐率和预热期来创建RateLimiter，这里的吞吐率是指每秒多少许可数（通常是指QPS，每秒多少个请求量），在这段预热时间内，RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。（只要存在足够请求数来使其饱和）
 * double	getRate()返回RateLimiter 配置中的稳定速率，该速率单位是每秒多少许可数
 * void	setRate(double permitsPerSecond)更新RateLimite的稳定速率，参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
 * String	toString()返回对象的字符表现形式
 * boolean	tryAcquire()从RateLimiter 获取许可，如果该许可可以在无延迟下的情况下立即获取得到的话
 * boolean	tryAcquire(int permits)从RateLimiter 获取许可数，如果该许可数可以在无延迟下的情况下立即获取得到的话
 * boolean	tryAcquire(int permits, long timeout, TimeUnit unit)从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话，或者如果无法在timeout 过期之前获取得到许可数的话，那么立即返回false （无需等待）
 * boolean	tryAcquire(long timeout, TimeUnit unit)从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话，或者如果无法在timeout 过期之前获取得到许可的话，那么立即返回false（无需等待）
 * <p>
 * <p>
 * 基于令牌桶算法的实现类
 *
 * @author sailor wang
 * @date 2018/10/31 2:04 PM
 * @description
 */
@Slf4j
public class RateLimiterTest {

    // 流量限制阻塞版
    @Test
    public void ratelimiterTest() {
        RateLimiter rateLimiter = RateLimiter.create(1.0);//速率，每秒处理1个请求
        List<UserRequest> requestList = Lists.newArrayList();
        for (int i = 1; i <= 10; i++) {
            requestList.add(new UserRequest(i));
        }

        ExecutorService es = Executors.newCachedThreadPool();
        for (UserRequest request : requestList) {
            Double waitTime = rateLimiter.acquire();
            log.info("等待时间 -> {}", waitTime);
            es.submit(request);
        }
    }

    // 流量限制非阻塞版
    @Test
    public void unblockRatelimiterTest() throws InterruptedException {
        RateLimiter rateLimiter = RateLimiter.create(2.0);//速率，每秒处理1个请求
        List<UserRequest> requestList = Lists.newArrayList();
        for (int i = 1; i <= 50; i++) {

            requestList.add(new UserRequest(i));
        }
        ExecutorService es = Executors.newCachedThreadPool();
        for (UserRequest request : requestList) {
            TimeUnit.MILLISECONDS.sleep(30);//假设每个请求不均匀，间隔30毫秒
            Boolean acquire = rateLimiter.tryAcquire();
            if (acquire) {
                log.info("正常处理请求");
                es.submit(request);
            } else {
                log.info("丢弃请求");
            }


        }
    }

    private static class UserRequest implements Runnable {
        private int id;

        public UserRequest(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            log.info("用户请求 -> {}", id);
        }
    }
}